In [ ]:
epochs = 10
# このチュートリアルの目的はトレーニングそのものではな無いので、スピードアップのためにデータの一部しか使いません。
# データセットをサイズを大きくした時にどうなるか興味のある方は以下の数字を大きくしてみてください。
n_train_items = 640
n_test_items = 640
機械学習をベースにした何らかのサービス(MLaaS)を提供したいとします。そのためにはまずモデルのトレーニングを行う必要がありますが、そのためには外部のパートナー企業が保有するデータにアクセスする必要がある、なんて言う事はよくあります。一方で医療や金融の世界ではモデルもデータもとても重要です。モデルのパラメータはビジネスの根幹(知財)に関わりますし、ユーザーデータ(個人情報)の利用は厳しく規制されています。
こういった状況下における潜在的な解決策の一つは、モデルもデータもどちらも暗号化し、暗号化されたままのデータでモデルのトレーニングを行うことです。この方法なら、モデルのトレーニングを行う企業はユーザーの個人情報を取得する必要はありませんし、サービスを利用する医療機関/金融機関も企業の知財であるモデル(のウェイト)を知る事はありません。
暗号化されたデータを使ってのコンピューテーションを行うための手法はいくつか存在しますが、その中でも"Secure Multi-Party Computation" (SMPC)、"Homomorphic Encryption" (FHE/SHE) 、それに"Functional Encryption"(FE)は良く知られています。ここでは、チュートリアル5で紹介した"Secure Multi-Party Computation"を特に紹介します。Secure Multi-Party ComputationはSecureNNやSPDZといった暗号化プロトコルを使い、shares
という概念によって暗号化を行います。shares
秘密鍵のような数値を関係各社で分割して持ち回るような概念です。
ところで、このチュートリアルの設定は次の通りです。あなたがサーバー(モデルのトレーニングを行う主体)で、N人のワーカーに分散しているデータを使ってモデルのトレーニングを行いたいとします。サーバーではshares
によってモデルを分割し、複数のワーカーに送信します。各ワーカーも自分たちのデータをshares
によって分割し、ワーカー間で共有します。このケースではAliceとBobの2人です。shares
を交換しあうことによって、各自は自身のshare
と他人のshare
を保持します。こうする事によって、サーバーは適切な暗号化プロトコルを使ってモデルのトーレニングを開始でき、トレーニングが終了後には、全てのshares
をサーバーに戻すことによって、サーバーはモデルを複合化できます。次の画像はこの一連のプロセスを図示しています。
この設定の例として、AliceとBobがMNISTデータセットを分割して保持していると仮定します。では、モデルをトレーニングして、手書き文字の分類タスクをやってみましょう。
Author:
In [ ]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import time
トレーニングに関するハイパーパラメータを定義します。これらは全てパブリック変数です。
In [ ]:
class Arguments():
def __init__(self):
self.batch_size = 64
self.test_batch_size = 64
self.epochs = epochs
self.lr = 0.02
self.seed = 1
self.log_interval = 1 # 一つのバッチ毎にログを表示します
self.precision_fractional = 3
args = Arguments()
_ = torch.manual_seed(args.seed)
PySyft関連のライブラリをインポートします。また、 alice
と bob
という名前でリモートワーカーを作成しておきましょう。更に暗号化プロトコルのプリミティブを提供するリモートワーカーを crypto_provider
という名前で作成しておきます。
In [ ]:
import syft as sy # Pysyftライブラリをインポート
hook = sy.TorchHook(torch) # PyTorchをHookして、PyTorchを拡張します
# simulation functions
def connect_to_workers(n_workers):
return [
sy.VirtualWorker(hook, id=f"worker{i+1}")
for i in range(n_workers)
]
def connect_to_crypto_provider():
return sy.VirtualWorker(hook, id="crypto_provider")
workers = connect_to_workers(n_workers=2)
crypto_provider = connect_to_crypto_provider()
shares
によって分割されたデータへのアクセスではここで、ユーティリティ(便利な)関数を使って次の状況を実現しましょう。MNISTデータセットはバラバラに複数のリモートワーカーによって保持されていると仮定していた事を思い出してください。ワーカーは自分のデータをバッチサイズに分割し、shares
によって暗号化の上、shares
を互いに持ち合います。最終的に返されるオブジェクトはshares
によって暗号化されたデータセットのiterable
です。私たちはこれをprivate data loaderと呼びます。
注記: このプロセス全体を通してローカルワーカー(サーバー、ここでは私たち)は決してデータを見ることはありません。
次に暗号化されたデータセットを使って今までと同様の手順でトレーニングを実行します。入力データも正解ラベルもshares
によって暗号化されています。
In [ ]:
def get_private_data_loaders(precision_fractional, workers, crypto_provider):
def one_hot_of(index_tensor):
"""
Transform to one hot tensor
Example:
[0, 3, 9]
=>
[[1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 1.]]
"""
onehot_tensor = torch.zeros(*index_tensor.shape, 10) # 10 classes for MNIST
onehot_tensor = onehot_tensor.scatter(1, index_tensor.view(-1, 1), 1)
return onehot_tensor
def secret_share(tensor):
"""
Transform to fixed precision and secret share a tensor
"""
return (
tensor
.fix_precision(precision_fractional=precision_fractional)
.share(*workers, crypto_provider=crypto_provider, requires_grad=True)
)
transformation = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
train_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=True, download=True, transform=transformation),
batch_size=args.batch_size
)
private_train_loader = [
(secret_share(data), secret_share(one_hot_of(target))) for i, (data, target) in enumerate(train_loader) if i < n_train_items / args.batch_size
]
test_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=False, download=True, transform=transformation),
batch_size=args.test_batch_size
)
private_test_loader = [
(secret_share(data), secret_share(target.float()))
for i, (data, target) in enumerate(test_loader)
if i < n_test_items / args.test_batch_size
]
return private_train_loader, private_test_loader
private_train_loader, private_test_loader = get_private_data_loaders(
precision_fractional=args.precision_fractional,
workers=workers,
crypto_provider=crypto_provider
)
これは今回使うモデルです。シンプルなモデルですが、まずまず良い感じに動作する事が知られています。 詳細はit has proved to perform reasonably well on MNISTを参照してください。
In [ ]:
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(28 * 28, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 10)
def forward(self, x):
x = x.view(-1, 28 * 28)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
In [ ]:
def train(args, model, private_train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(private_train_loader): # <-- プライバシーに配慮したデータローダーです
start_time = time.time()
optimizer.zero_grad()
output = model(data)
# loss = F.nll_loss(output, target) <-- SMPC環境下では再現が難しいため、変更します
batch_size = output.shape[0]
loss = ((output - target)**2).sum().refresh()/batch_size
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
loss = loss.get().float_precision()
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tTime: {:.3f}s'.format(
epoch, batch_idx * args.batch_size, len(private_train_loader) * args.batch_size,
100. * batch_idx / len(private_train_loader), loss.item(), time.time() - start_time))
テスト関数に変更はありません
In [ ]:
def test(args, model, private_test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in private_test_loader:
start_time = time.time()
output = model(data)
pred = output.argmax(dim=1)
correct += pred.eq(target.view_as(pred)).sum()
correct = correct.get().float_precision()
print('\nTest set: Accuracy: {}/{} ({:.0f}%)\n'.format(
correct.item(), len(private_test_loader)* args.test_batch_size,
100. * correct.item() / (len(private_test_loader) * args.test_batch_size)))
In [ ]:
model = Net()
model = model.fix_precision().share(*workers, crypto_provider=crypto_provider, requires_grad=True)
optimizer = optim.SGD(model.parameters(), lr=args.lr)
optimizer = optimizer.fix_precision()
for epoch in range(1, args.epochs + 1):
train(args, model, private_train_loader, optimizer, epoch)
test(args, model, private_test_loader)
ジャジャーン!全てが暗号化された環境下で、MNISTのトレーニングをする事ができました。データは全体のごく一部しか使っていませんが、75%の精度を達成する事ができています。
第1はコンピューテーションに掛かる時間です。もう既に気がついていることと思いますが、プレーンのトレーニングと比べて随分と遅いです。バッチサイズ、64、でバッチあたりの時間が、ピュアなPyTorchなら13ミリ秒しか掛からないところを、3.2秒も掛かっています。もちろんこれは大きな問題でです。ですが、思い出してください。私たちは全てをリモート、かつ暗号化された環境下で行いました。ただの一つのデータさせ流出はさせていません。1つのデータ処理に掛かる時間は50ミリ秒ほどです。それほど悪くないと思いませんか?ここで問うべき問題は、暗号化環境下でのトレーニングしか選択肢がない時に、使えるか使えないかという事です。例えば実運用環境で、推論に50ミリ秒なら全然問題ありませんよね。
ところで、一番のボトルネックはアクティベーション関数です。例えばディープラーニングでよく使われるreluですが、暗号化環境下での"比較"とSecureNNプロトコルを使用するので、時間が掛かります。CryptoNetsなどの暗号化環境下でのコンピューテーション関連の論文に見られるようにreluをquadraticアクティベーションで置き換えると3.2秒掛かっていたバッチ処理が1.2秒になったという報告もあります。
いずれにせよ、現時点でのベストプラクティスとしては、必要な部分だけを暗号化するということです。次のチュートリアルではその方法を紹介します。
In [ ]:
model.fc3.bias
データも覗いてみましょう
In [ ]:
first_batch, input_data = 0, 0
private_train_loader[first_batch][input_data]
ご覧の通り、AutogradTensorが登場しています。FixedPrecisionTensorとPyTorchのTensorのラッパーの間に存在しています。これはデータが整数として存在していることを示唆しています。AutogradTensorの目的は、暗号化されたデータでの演算が行われる際に、コンピューテーショングラフを保持することにあります。AutogradTensorは、そのままでは整数では動かない全ての関連関数を上書きます。
例としては、Beaver triples trickを使って実装されている乗算などがあります。乗算の微分以上の複雑な微分を自分でやるは避けたいですよね。$\partial_b (a \cdot b) = a \cdot \partial b$
ここに微分計算の実装の例を挙げておきます。
class MulBackward(GradFunc):
def __init__(self, self_, other):
super().__init__(self, self_, other)
self.self_ = self_
self.other = other
def gradient(self, grad):
grad_self_ = grad * self.other
grad_other = grad * self.self_ if type(self.self_) == type(self.other) else None
return (grad_self_, grad_other)
勾配ベクトル関連の実装に興味がある方はtensors/interpreters/gradients.py
を参照してください。
ところで、コンピューテーショングラフに関してですが、コピーはローカルに存在し、フォーワードとバックワードの指示はサーバーから受けます。これは想定通りですね。
ここで、セキュリティについて一つ言及しておきましょう。私たちが想定しているのは、悪意は無い相手です。悪意が無いの意味は、データやモデルが暗号化されずに送られてきてしまったら興味本位で見てしまう可能性はあるけど、意図的に全てを台無しにしようと思っているわけではな無い、という事です。というのも、PySyftやSMPCを使えば、相手はデータの中身を見ることは出来ないけれど、悪意があれば、shares
を改編するなどしてトレーニングをストップさせることは可能です。SMPCでは悪意を持った相手に対する対応策は現在ありません。
これに加えて、Secure Multi-Party Computationを使うことによってデータそのものへのアクセスは出来なくなっていたとしても、平文で入力データをサーバーに送れば入力データや推論結果が盗み見られてしまう危険はあります。そして、その入力データによっては何がトレーニングデータとして使われたのか明らかになってしまう場合もあります。得に、具体的な学習データを復元させようとするmembership attacksと呼ばれる攻撃手法に対しては(特別な)防御メカニズムを持っていません。また、モデルが予期せぬ、本来覚えるべきでないことを学んでしまうケースや、model inversion attackに対する脆弱性は一般のディープラーニングと同じです。
これらの脅威に対する一般的な対応策は、Differential Privacyを使うことです。Secure Multi-Party Computationとの相性もよく、とても興味深いセキュリティを提供します。私たちは現在Differential Privacyを使った実装に取り組んでいる最中です。準備が出来たら応用例を紹介します。
SMPCを使ったモデルのトレーニングは、内部的には複雑なことをやっているのですが、コードという点では難しいものではありません。 これを踏まえて、どんな時に暗号化されたデータでのディープラーニングが必要なのか、是非考えてみてください。暗号化されたデータでのディープラーニングは通常のそれより遅いですから、どんな時にコンピューテーションのコストが正当化できるを考えるのはとても重要なことです。
もし、このチュートリアルを気に入って、プライバシーに配慮した非中央集権的なAI技術や付随する(データやモデルの)サプライチェーンにご興味があって、プロジェクトに参加したいと思われるなら、以下の方法で可能です。
一番簡単に貢献できる方法はこのGitHubのレポジトリにスターを付けていただくことです。スターが増えると露出が増え、より多くのデベロッパーにこのクールな技術の事を知って貰えます。
最新の開発状況のトラッキングする一番良い方法はSlackに入ることです。 下記フォームから入る事ができます。 http://slack.openmined.org
コミュニティに貢献する一番良い方法はソースコードのコントリビューターになることです。PySyftのGitHubへアクセスしてIssueのページを開き、"Projects"で検索してみてください。参加し得るプロジェクトの状況を把握することができます。また、"good first issue"とマークされているIssueを探す事でミニプロジェクトを探すこともできます。
もし、ソースコードで貢献できるほどの時間は取れないけど、是非何かサポートしたいという場合は、寄付をしていただくことも可能です。寄附金の全ては、ハッカソンやミートアップの開催といった、コミュニティ運営経費として利用されます。
In [ ]:
In [ ]: